package com.wu.rocketmq_demo;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.messaging.Message;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

/**
 * @author benjamin_5
 * @Description
 * @date 2024/3/7
 */
@RestController
@RequestMapping("demo")
public class DemoController {

    @Resource
    private DefaultMQProducer mqProducer;
    @Resource
    private RocketMQTemplate rocketMQTemplate;

    @GetMapping("send")
    public String send(){
        org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message("topic_test", "tag_test", "消息内容".getBytes(StandardCharsets.UTF_8));
        try {
            mqProducer.send(msg, 1000);
        } catch (Exception e) {
            e.printStackTrace();
            return "fail";
        }
        return "success";
    }

    @GetMapping("send2")
    public String send2(){
        Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ Normal_msg").build();
        rocketMQTemplate.send("topic_test",msg);
        return "success";
    }


    @RequestMapping(value = "/send3")
    public void send3() {
        // 发送普通字符串消息
        rocketMQTemplate.convertAndSend("topic_test", "Hello Word");
    }

    /**
     * 同步发送
     */
    @GetMapping(value = "/syncSend")
    public void syncSend() {
        String message = "同步消息";
        SendResult sendResult = rocketMQTemplate.syncSend("topic_test:tag_test", message);
        System.out.println("发送结果："+sendResult);
    }

    /**
     * 异步发送
     */
    @GetMapping(value = "/aSyncSend")
    public void aSyncSend() {
        String message = "异步消息";
        // 定义回调函数
        SendCallback callback = new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }

            @Override
            public void onException(Throwable throwable) {
                System.out.println("发送失败");
            }
        };
        rocketMQTemplate.asyncSend("topic_test", message, callback);
    }

    /**
     * 单向发送
     */
    @GetMapping(value = "/sendOneWay")
    public void sendOneWay() {
        rocketMQTemplate.sendOneWay("topic_test", "发送单向消息");
    }

    /**
     * 顺序发送
     */
    @GetMapping(value = "/sendOrder")
    public void sendOrder() {
        for (int i = 0; i < 10; i++) {
            // 模拟奇数、偶数分别为一组
            rocketMQTemplate.syncSendOrderly("topic_order", "消息"+i, (i%2)+"");
        }
    }

    /**
     * 批量发送
     */
    @GetMapping(value = "/sendBatch")
    public void sendBatch() {
        List<org.apache.rocketmq.common.message.Message> messages = new ArrayList<>();
        messages.add(new org.apache.rocketmq.common.message.Message("topic_test", "Tag",  "消息1".getBytes()));
        messages.add(new org.apache.rocketmq.common.message.Message("topic_test", "Tag",  "消息2".getBytes()));
        messages.add(new org.apache.rocketmq.common.message.Message("topic_test", "Tag",  "消息3".getBytes()));
        // 模拟奇数、偶数分别为一组
        SendResult sendResult = rocketMQTemplate.syncSend("topic_test", messages);
        System.out.println("发送结果："+sendResult);
    }

    /**
     * 延迟发送
     */
    @GetMapping(value = "/sendDelay")
    public void sendDelay() {
        Message<String> msg = MessageBuilder.withPayload("延迟消息").build();
        SendResult sendResult = rocketMQTemplate.syncSend("topic_test", msg, 3, 2);
        System.out.println("发送结果："+sendResult);
    }

    /**
     * 事务消息
     */
    @GetMapping(value = "/sendTransaction")
    public void sendTransaction() {
        Message<String> msg = MessageBuilder.withPayload("延迟消息").build();
        SendResult sendResult = rocketMQTemplate.sendMessageInTransaction("topic_test", msg, null);
        System.out.println("发送结果："+sendResult);
    }

    @GetMapping(value = "/poll")
    public void poll() {
        List<String> list = rocketMQTemplate.receive(String.class);
        for (String message : list) {
            System.out.println("poll消费："+message);
        }
    }
}
